package org.lessim.socket;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

/**
 * @date: 2021/12/1 3:41 下午
 * @author: Menjoe
 */
@Slf4j
public class MessageHandler implements WebSocketHandler {

    /**
     * 前台连接并且注册了账户
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        log.info("Websocket: " + session.getId() + " 已经接入");
        Long uid = (Long) session.getAttributes().get(SysConstants.UID);
        SessionPool.put(uid, session);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        if (message.getPayloadLength() == 0) {
            return;
        }
        NotificationMessage msg = JSON.parseObject(message.getPayload().toString(), NotificationMessage.class);
        msg.setDate(System.currentTimeMillis());
        this.sendMessageToUser(msg.getTo(), new TextMessage(JSON.toJSONString(msg)));
    }

    /**
     * 消息传输错误处理，如果出现错误直接断开连接
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            session.close();
        }
        this.removeWebSocketUser(session);
    }

    /**
     * 关闭连接后
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) {
        log.info("Websocket: " + session.getId() + " 已经关闭");
        this.removeWebSocketUser(session);
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 给所有在线用户发送消息
     *
     * @param message
     * @throws IOException
     */
    public void broadcast(final TextMessage message) {
        Iterator<Map.Entry<Long, WebSocketSession>> it = SessionPool.entrySet().iterator();
        while (it.hasNext()) {
            final Map.Entry<Long, WebSocketSession> entry = it.next();
            if (entry.getValue().isOpen()) {
                new Thread(() -> {
                    try {
                        if (entry.getValue().isOpen()) {
                            entry.getValue().sendMessage(message);
                        }
                    } catch (IOException e) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] 推送失败, Cause: {}", entry.getKey(), e.getCause());
                        }
                    }
                }).start();
            }
        }
    }

    public void sendMessageToUser(Long uid, TextMessage message) {
        new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                WebSocketSession session = SessionPool.get(uid);
                if (session != null && session.isOpen()) {
                    try {
                        session.sendMessage(message);
                        Thread.sleep(10);
                    } catch (IOException | InterruptedException e) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] 推送失败, Cause: {}", uid, e.getCause());
                        }
                    }
                }
            }
        }).start();
    }

    public void removeWebSocketUser(WebSocketSession session) {
        String sessionId = session.getId();
        if (!StringUtils.hasText(sessionId)) {
            return;
        }
        SessionPool.remove(sessionId);
    }

}
